# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import asyncio
import concurrent.futures
import inspect
from inspect import cleandoc

# Disable logging for a cleaner testing output
import logging
from os.path import dirname, join
import re
import sys
from typing import ClassVar
from unittest import mock

from helpers import OuterEventLoop
import pytest
import requests

from apprise import (
    Apprise,
    AppriseAsset,
    AppriseAttachment,
    NotificationManager,
    NotifyBase,
    NotifyFormat,
    NotifyImageSize,
    NotifyType,
    PrivacyMode,
    URLBase,
    __version__,
)
from apprise.locale import LazyTranslation, gettext_lazy as _
from apprise.plugins.base import RequirementsSpec
from apprise.utils.parse import parse_list

logging.disable(logging.CRITICAL)

# Attachment Directory
TEST_VAR_DIR = join(dirname(__file__), "var")

# Grant access to our Notification Manager Singleton
N_MGR = NotificationManager()


def test_apprise_object():
    """
    API: Apprise() object

    """

    def do_notify(server, *args, **kwargs):
        return server.notify(*args, **kwargs)

    apprise_test(do_notify)


def test_apprise_async():
    """
    API: Apprise() object asynchronous methods

    """
    with OuterEventLoop() as loop:

        def do_notify(server, *args, **kwargs):
            return loop.run_until_complete(
                server.async_notify(*args, **kwargs)
            )

        apprise_test(do_notify)


def apprise_test(do_notify):
    a = Apprise()

    # no items
    assert len(a) == 0

    # Apprise object can also be directly tested with 'if' keyword
    # No entries results in a False response
    assert not a

    # Create an Asset object
    asset = AppriseAsset(theme="default")

    # We can load the device using our asset
    a = Apprise(asset=asset)

    # We can load our servers up front as well
    servers = [
        "json://myhost",
        "kodi://kodi.server.local",
    ]

    a = Apprise(servers=servers)

    # 2 servers loaded
    assert len(a) == 2

    # Apprise object can also be directly tested with 'if' keyword
    # At least one entry results in a True response
    assert a

    # We can retrieve our URLs this way:
    assert len(a.urls()) == 2

    # We can add another server
    assert (
        a.add(
            "mmosts://mattermost.server.local/3ccdd113474722377935511fc85d3dd4"
        )
        is True
    )
    assert len(a) == 3

    # Try adding nothing but delimiters
    assert a.add(",, ,, , , , ,") is False

    # The number of servers added doesn't change
    assert len(a) == 3

    # We can pop an object off of our stack by it's indexed value:
    obj = a.pop(0)
    assert isinstance(obj, NotifyBase) is True
    assert len(a) == 2

    # We can retrieve elements from our list too by reference:
    assert isinstance(a[0].url(), str) is True

    # We can iterate over our list too:
    count = 0
    for o in a:
        assert isinstance(o.url(), str) is True
        count += 1
    # verify that we did indeed iterate over each element
    assert len(a) == count

    # We can empty our set
    a.clear()
    assert len(a) == 0

    # An invalid schema
    assert a.add("this is not a parseable url at all") is False
    assert len(a) == 0

    # An unsupported schema
    assert a.add("invalid://we.just.do.not.support.this.plugin.type") is False
    assert len(a) == 0

    # A poorly formatted URL
    assert a.add("json://user:@@@:bad?no.good") is False
    assert len(a) == 0

    # Add a server with our asset we created earlier
    assert (
        a.add(
            "mmosts://mattermost.server.local/"
            "3ccdd113474722377935511fc85d3dd4",
            asset=asset,
        )
        is True
    )

    # Clear our server listings again
    a.clear()
    assert len(a) == 0

    # No servers to notify
    assert do_notify(a, title="my title", body="my body") is False

    # More Variations of Multiple Adding of URLs
    a = Apprise()
    assert a.add(servers)
    assert len(a) == 2
    a.clear()

    assert a.add("ntfys://user:pass@host/test, json://localhost")
    assert len(a) == 2
    a.clear()

    assert a.add(["ntfys://user:pass@host/test", "json://localhost"])
    assert len(a) == 2
    a.clear()

    assert a.add(("ntfys://user:pass@host/test", "json://localhost"))
    assert len(a) == 2
    a.clear()

    assert a.add({"ntfys://user:pass@host/test", "json://localhost"})
    assert len(a) == 2
    a.clear()

    # Pass a list entry containing 1 string with 2 elements in it
    # Mimic Home-Assistant core-2024.2.1 Issue:
    #   - https://github.com/home-assistant/core/issues/110242
    #
    # In this case, the first one will load, but not the second entry
    # This is by design; but captured here to illustrate the issue
    assert a.add(["ntfys://user:pass@host/test, json://localhost"])
    assert len(a) == 1
    assert a[0].url().startswith("ntfys://")
    a.clear()

    # Following thorugh with the problem of providing a list containing
    # an entry with 2 URLs in it... while the ntfys parsed okay above,
    # the same can't be said for other combinations.  It's important
    # to always keep strings separately
    assert a.add(["mailto://user:pass@example.com, json://localhost"]) is False
    assert len(a) == 0

    # Showing that the URLs were valid on their own:
    assert a.add(*["mailto://user:pass@example.com, json://localhost"])
    assert len(a) == 2
    assert a[0].url().startswith("mailto://")
    assert a[1].url().startswith("json://")
    a.clear()

    class BadNotification(NotifyBase):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)

            # We fail whenever we're initialized
            raise TypeError()

        @staticmethod
        def parse_url(url, *args, **kwargs):
            # always parseable
            return NotifyBase.parse_url(url, verify_host=False)

    class GoodNotification(NotifyBase):
        def __init__(self, **kwargs):
            super().__init__(notify_format=NotifyFormat.HTML, **kwargs)

        def send(self, **kwargs):
            # Pretend everything is okay
            return True

        @staticmethod
        def parse_url(url, *args, **kwargs):
            # always parseable
            return NotifyBase.parse_url(url, verify_host=False)

    # Store our bad notification in our schema map
    N_MGR["bad"] = BadNotification

    # Store our good notification in our schema map
    N_MGR["good"] = GoodNotification

    # Just to explain what is happening here, we would have parsed the
    # url properly but failed when we went to go and create an instance
    # of it.
    assert a.add("bad://localhost") is False
    assert len(a) == 0

    # We'll fail because we've got nothing to notify
    assert do_notify(a, title="my title", body="my body") is False

    # Clear our server listings again
    a.clear()

    assert a.add("good://localhost") is True
    assert len(a) == 1

    # Bad Notification Types are not accepted
    assert (
        do_notify(a, title="my title", body="my body", notify_type="bad")
        is False
    )

    # Notifications of string type are accepted
    assert (
        do_notify(a, title="my title", body="my body", notify_type="warning")
        is True
    )

    # Notifications of string type are accepted
    assert (
        do_notify(a, title="my title", body="my body", notify_type="warning")
        is True
    )

    # Notifications of string type are accepted
    assert (
        do_notify(a, title="my title", body="my body", notify_type="warning")
        is True
    )

    # Notifications where notify_type is of the NotifyType object is the
    # preferred choice
    assert (
        do_notify(
            a, title="my title", body="my body",
            notify_type=NotifyType.WARNING)
        is True
    )

    # No Title/Body combo's
    assert do_notify(a, title=None, body=None) is False
    assert do_notify(a, title="", body=None) is False
    assert do_notify(a, title=None, body="") is False

    assert do_notify(a, title=5, body=b"bytes") is False
    assert do_notify(a, title=b"bytes", body=10) is False
    assert do_notify(a, title=object(), body=b"bytes") is False
    assert do_notify(a, title=b"bytes", body=object()) is False

    # A Body must be present
    assert do_notify(a, title="present", body=None) is False

    # Other combinations work fine
    assert do_notify(a, title=None, body="present") is True
    assert do_notify(a, title="present", body="present") is True

    # Send Attachment with success
    attach = join(TEST_VAR_DIR, "apprise-test.gif")
    assert (
        do_notify(
            a,
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach=attach,
        )
        is True
    )

    # Send the attachment as an AppriseAttachment object
    assert (
        do_notify(
            a,
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach=AppriseAttachment(attach),
        )
        is True
    )

    # test a invalid attachment
    assert (
        do_notify(
            a,
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach="invalid://",
        )
        is False
    )

    # Repeat the same tests above...
    # however do it by directly accessing the object; this grants the similar
    # results:
    assert (
        do_notify(
            a[0],
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach=attach,
        )
        is True
    )

    # Send the attachment as an AppriseAttachment object
    assert (
        do_notify(
            a[0],
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach=AppriseAttachment(attach),
        )
        is True
    )

    # test a invalid attachment
    assert (
        do_notify(
            a[0],
            body="body",
            title="test",
            notify_type=NotifyType.INFO,
            attach="invalid://",
        )
        is False
    )

    class ThrowNotification(NotifyBase):
        def notify(self, **kwargs):
            # Pretend everything is okay
            raise TypeError()

        async def async_notify(self, **kwargs):
            # Pretend everything is okay (async)
            raise TypeError()

    class RuntimeNotification(NotifyBase):
        def notify(self, **kwargs):
            # Pretend everything is okay
            raise RuntimeError()

        async def async_notify(self, **kwargs):
            # Pretend everything is okay (async)
            raise TypeError()

    class FailNotification(NotifyBase):

        def notify(self, **kwargs):
            # Pretend everything is okay
            return False

        async def async_notify(self, **kwargs):
            # Pretend everything is okay (async)
            raise TypeError()

    # Store our bad notification in our schema map
    N_MGR["throw"] = ThrowNotification

    # Store our good notification in our schema map
    N_MGR["fail"] = FailNotification

    # Store our good notification in our schema map
    N_MGR["runtime"] = RuntimeNotification

    for async_mode in (True, False):
        # Create an Asset object
        asset = AppriseAsset(theme="default", async_mode=async_mode)

        # We can load the device using our asset
        a = Apprise(asset=asset)

        assert a.add("runtime://localhost") is True
        assert a.add("throw://localhost") is True
        assert a.add("fail://localhost") is True
        assert len(a) == 3

        # Test when our notify both throws an exception and or just
        # simply returns False
        assert do_notify(a, title="present", body="present") is False

    # Create a Notification that throws an unexected exception
    class ThrowInstantiateNotification(NotifyBase):
        def __init__(self, **kwargs):
            # Pretend everything is okay
            raise TypeError()

    N_MGR.unload_modules()
    N_MGR["throw"] = ThrowInstantiateNotification

    # Store our good notification in our schema map
    N_MGR["good"] = GoodNotification

    # Reset our object
    a.clear()
    assert len(a) == 0

    # Test our socket details
    # rto = Socket Read Timeout
    # cto = Socket Connect Timeout
    plugin = a.instantiate("good://localhost?rto=5.1&cto=10")
    assert isinstance(plugin, NotifyBase)
    assert plugin.socket_connect_timeout == 10.0
    assert plugin.socket_read_timeout == 5.1

    plugin = a.instantiate("good://localhost?rto=invalid&cto=invalid")
    assert isinstance(plugin, NotifyBase)
    assert plugin.socket_connect_timeout == URLBase.socket_connect_timeout
    assert plugin.socket_read_timeout == URLBase.socket_read_timeout

    # Reset our object
    a.clear()
    assert len(a) == 0

    with pytest.raises(ValueError):
        # Encoding error
        AppriseAsset(encoding="ascii", storage_salt="ボールト")

    with pytest.raises(ValueError):
        # Not a valid storage salt (must be str or bytes)
        AppriseAsset(storage_salt=42)

    # Set our cache to be off
    plugin = a.instantiate("good://localhost?store=no", asset=asset)
    assert isinstance(plugin, NotifyBase)
    assert plugin.url_id(lazy=False) is None
    # Verify our cache is disabled
    assert "store=no" in plugin.url()

    with pytest.raises(ValueError):
        # idlen must be greater then 0
        AppriseAsset(storage_idlen=-1)

    # Create a larger idlen
    asset = AppriseAsset(storage_idlen=32)
    plugin = a.instantiate("good://localhost", asset=asset)
    assert len(plugin.url_id()) == 32

    # Instantiate a bad object
    plugin = a.instantiate(object, tag="bad_object")
    assert plugin is None

    # Instantiate a good object
    plugin = a.instantiate("good://localhost", tag="good")
    assert isinstance(plugin, NotifyBase)

    # Test simple tagging inside of the object
    assert "good" in plugin
    assert "bad" not in plugin

    # the in (__contains__ override) is based on or'ed content; so although
    # 'bad' isn't tagged as being in the plugin, 'good' is, so the return
    # value of this is True
    assert ["bad", "good"] in plugin
    assert {"bad", "good"} in plugin
    assert ("bad", "good") in plugin

    # We an add already substatiated instances into our Apprise object
    a.add(plugin)
    assert len(a) == 1

    # We can add entries as a list too (to add more then one)
    a.add([plugin, plugin, plugin])
    assert len(a) == 4

    # Reset our object again
    a.clear()
    with pytest.raises(TypeError):
        a.instantiate("throw://localhost", suppress_exceptions=False)

    assert len(a) == 0

    assert a.instantiate("throw://localhost", suppress_exceptions=True) is None
    assert len(a) == 0

    #
    # We rince and repeat the same tests as above, however we do them
    # using the dict version
    #

    # Reset our object
    a.clear()
    assert len(a) == 0

    # Instantiate a good object
    plugin = a.instantiate({"schema": "good", "host": "localhost"}, tag="good")
    assert isinstance(plugin, NotifyBase)

    # Test simple tagging inside of the object
    assert "good" in plugin
    assert "bad" not in plugin

    # the in (__contains__ override) is based on or'ed content; so although
    # 'bad' isn't tagged as being in the plugin, 'good' is, so the return
    # value of this is True
    assert ["bad", "good"] in plugin
    assert {"bad", "good"} in plugin
    assert ("bad", "good") in plugin

    # We an add already substatiated instances into our Apprise object
    a.add(plugin)
    assert len(a) == 1

    # We can add entries as a list too (to add more then one)
    a.add([plugin, plugin, plugin])
    assert len(a) == 4

    # Reset our object again
    a.clear()
    with pytest.raises(TypeError):
        a.instantiate(
            {"schema": "throw", "host": "localhost"}, suppress_exceptions=False
        )

    assert len(a) == 0

    assert (
        a.instantiate(
            {"schema": "throw", "host": "localhost"}, suppress_exceptions=True
        )
        is None
    )
    assert len(a) == 0


def test_apprise_pretty_print():
    """
    API: Apprise() Pretty Print tests

    """
    # Privacy Print
    # PrivacyMode.Secret always returns the same thing to avoid guessing
    assert (
        URLBase.pprint(None, privacy=True, mode=PrivacyMode.Secret) == "****"
    )
    assert URLBase.pprint(42, privacy=True, mode=PrivacyMode.Secret) == "****"
    assert (
        URLBase.pprint(object, privacy=True, mode=PrivacyMode.Secret) == "****"
    )
    assert URLBase.pprint("", privacy=True, mode=PrivacyMode.Secret) == "****"
    assert URLBase.pprint("a", privacy=True, mode=PrivacyMode.Secret) == "****"
    assert (
        URLBase.pprint("ab", privacy=True, mode=PrivacyMode.Secret) == "****"
    )
    assert (
        URLBase.pprint("abcdefghijk", privacy=True, mode=PrivacyMode.Secret)
        == "****"
    )

    # PrivacyMode.Outer
    assert URLBase.pprint(None, privacy=True, mode=PrivacyMode.Outer) == ""
    assert URLBase.pprint(42, privacy=True, mode=PrivacyMode.Outer) == ""
    assert URLBase.pprint(object, privacy=True, mode=PrivacyMode.Outer) == ""
    assert URLBase.pprint("", privacy=True, mode=PrivacyMode.Outer) == ""
    assert URLBase.pprint("a", privacy=True, mode=PrivacyMode.Outer) == "a...a"
    assert (
        URLBase.pprint("ab", privacy=True, mode=PrivacyMode.Outer) == "a...b"
    )
    assert (
        URLBase.pprint("abcdefghijk", privacy=True, mode=PrivacyMode.Outer)
        == "a...k"
    )

    # PrivacyMode.Tail
    assert URLBase.pprint(None, privacy=True, mode=PrivacyMode.Tail) == ""
    assert URLBase.pprint(42, privacy=True, mode=PrivacyMode.Tail) == ""
    assert URLBase.pprint(object, privacy=True, mode=PrivacyMode.Tail) == ""
    assert URLBase.pprint("", privacy=True, mode=PrivacyMode.Tail) == ""
    assert URLBase.pprint("a", privacy=True, mode=PrivacyMode.Tail) == "...a"
    assert URLBase.pprint("ab", privacy=True, mode=PrivacyMode.Tail) == "...ab"
    assert (
        URLBase.pprint("abcdefghijk", privacy=True, mode=PrivacyMode.Tail)
        == "...hijk"
    )

    # Quoting settings
    assert URLBase.pprint(" ", privacy=False, safe="") == "%20"
    assert URLBase.pprint(" ", privacy=False, quote=False, safe="") == " "


@mock.patch("requests.get")
@mock.patch("requests.post")
def test_apprise_tagging(mock_post, mock_get):
    """
    API: Apprise() object tagging functionality

    """

    def do_notify(server, *args, **kwargs):
        return server.notify(*args, **kwargs)

    apprise_tagging_test(mock_post, mock_get, do_notify)


@mock.patch("requests.get")
@mock.patch("requests.post")
def test_apprise_tagging_async(mock_post, mock_get):
    """
    API: Apprise() object tagging functionality asynchronous methods

    """
    with OuterEventLoop() as loop:

        def do_notify(server, *args, **kwargs):
            return loop.run_until_complete(
                server.async_notify(*args, **kwargs)
            )

        apprise_tagging_test(mock_post, mock_get, do_notify)


def apprise_tagging_test(mock_post, mock_get, do_notify):
    # A request
    robj = mock.Mock()
    robj.raw = mock.Mock()
    # Allow raw.read() calls
    robj.raw.read.return_value = ""
    robj.text = ""
    robj.content = ""
    mock_get.return_value = robj
    mock_post.return_value = robj

    # Simulate a successful notification
    mock_get.return_value.status_code = requests.codes.ok
    mock_post.return_value.status_code = requests.codes.ok

    # Create our object
    a = Apprise()

    # An invalid addition can't add the tag
    assert a.add("averyinvalidschema://localhost", tag="uhoh") is False
    assert (
        a.add(
            {"schema": "averyinvalidschema", "host": "localhost"}, tag="uhoh"
        )
        is False
    )

    # Add entry and assign it to a tag called 'awesome'
    assert a.add("json://localhost/path1/", tag="awesome") is True
    assert (
        a.add(
            {"schema": "json", "host": "localhost", "fullpath": "/path1/"},
            tag="awesome",
        )
        is True
    )

    # Add another notification and assign it to a tag called 'awesome'
    # and another tag called 'local'
    assert a.add("json://localhost/path2/", tag=["mmost", "awesome"]) is True

    # notify the awesome tag; this would notify both services behind the
    # scenes
    assert (
        do_notify(a, title="my title", body="my body", tag="awesome") is True
    )

    # notify all of the tags
    assert (
        do_notify(
            a, title="my title", body="my body", tag=["awesome", "mmost"]
        )
        is True
    )

    # When we query against our loaded notifications for a tag that simply
    # isn't assigned to anything, we return None.  None (different then False)
    # tells us that we litterally had nothing to query.  We didn't fail...
    # but we also didn't do anything...
    assert (
        do_notify(a, title="my title", body="my body", tag="missing") is None
    )

    # Now to test the ability to and and/or notifications
    a = Apprise()

    # Add a tag by tuple
    assert a.add("json://localhost/tagA/", tag=("TagA",)) is True
    # Add 2 tags by string
    assert a.add("json://localhost/tagAB/", tag="TagA, TagB") is True
    # Add a tag using a set
    assert a.add("json://localhost/tagB/", tag={"TagB"}) is True
    # Add a tag by string (again)
    assert a.add("json://localhost/tagC/", tag="TagC") is True
    # Add 2 tags using a list
    assert a.add("json://localhost/tagCD/", tag=["TagC", "TagD"]) is True
    # Add a tag by string (again)
    assert a.add("json://localhost/tagD/", tag="TagD") is True
    # add a tag set by set (again)
    assert (
        a.add("json://localhost/tagCDE/", tag={"TagC", "TagD", "TagE"}) is True
    )

    # Expression: TagC and TagD
    # Matches the following only:
    #   - json://localhost/tagCD/
    #   - json://localhost/tagCDE/
    assert (
        do_notify(a, title="my title", body="my body", tag=[("TagC", "TagD")])
        is True
    )

    # Expression: (TagY and TagZ) or TagX
    # Matches nothing, None is returned in this case
    assert (
        do_notify(
            a, title="my title", body="my body", tag=[("TagY", "TagZ"), "TagX"]
        )
        is None
    )

    # Expression: (TagY and TagZ) or TagA
    # Matches the following only:
    #   - json://localhost/tagAB/
    assert (
        do_notify(
            a, title="my title", body="my body", tag=[("TagY", "TagZ"), "TagA"]
        )
        is True
    )

    # Expression: (TagE and TagD) or TagB
    # Matches the following only:
    #   - json://localhost/tagCDE/
    #   - json://localhost/tagAB/
    #   - json://localhost/tagB/
    assert (
        do_notify(
            a, title="my title", body="my body", tag=[("TagE", "TagD"), "TagB"]
        )
        is True
    )

    # Garbage Entries in tag field just get stripped out. the below
    # is the same as notifying no tags at all. Since we have not added
    # any entries that do not have tags (that we can match against)
    # we fail.  None is returned as a way of letting us know that we
    # had Notifications to notify, but since none of them matched our tag
    # none were notified.
    assert (
        do_notify(
            a,
            title="my title",
            body="my body",
            tag=[
                (object,),
            ],
        )
        is None
    )


def test_apprise_schemas():
    """
    API: Apprise().schema() tests

    """
    # Clear loaded modules
    a = Apprise()

    # no items
    assert len(a) == 0

    class TextNotification(NotifyBase):
        # set our default notification format
        notify_format = NotifyFormat.TEXT

        # Garbage Protocol Entries
        protocol = None

        secure_protocol = (None, object)

    class HtmlNotification(NotifyBase):

        protocol = ("html", "htm")

        secure_protocol = ("htmls", "htms")

    class MarkDownNotification(NotifyBase):

        protocol = "markdown"

        secure_protocol = "markdowns"

    schemas = URLBase.schemas(TextNotification)
    assert isinstance(schemas, set) is True
    # We didn't define a protocol or secure protocol
    assert len(schemas) == 0

    # Store our notifications into our schema map
    N_MGR["text"] = TextNotification
    N_MGR["html"] = HtmlNotification
    N_MGR["markdown"] = MarkDownNotification

    schemas = URLBase.schemas(TextNotification)
    assert isinstance(schemas, set) is True
    # We didn't define a protocol or secure protocol one
    # but one got assigned in he above N_MGR call
    assert len(schemas) == 1
    assert "text" in schemas

    schemas = URLBase.schemas(HtmlNotification)
    assert isinstance(schemas, set) is True
    assert len(schemas) == 4
    assert "html" in schemas
    assert "htm" in schemas
    assert "htmls" in schemas
    assert "htms" in schemas

    # Invalid entries do not disrupt schema calls
    for garbage in (object(), None, 42):
        schemas = URLBase.schemas(garbage)
        assert isinstance(schemas, set) is True
        assert len(schemas) == 0


def test_apprise_urlbase_object():
    """
    API: Apprise() URLBase object testing

    """
    results = URLBase.parse_url("https://localhost/path/?cto=3.0&verify=no")
    assert results.get("user") is None
    assert results.get("password") is None
    assert results.get("path") == "/path/"
    assert results.get("secure") is True
    assert results.get("verify") is False
    base = URLBase(**results)
    assert base.request_timeout == (3.0, 4.0)
    assert base.request_auth is None
    assert base.request_url == "https://localhost/path/"
    assert base.url().startswith("https://localhost/")

    results = URLBase.parse_url(
        "http://user:pass@localhost:34/path/here?rto=3.0&verify=yes"
    )
    assert results.get("user") == "user"
    assert results.get("password") == "pass"
    assert results.get("fullpath") == "/path/here"
    assert results.get("secure") is False
    assert results.get("verify") is True
    base = URLBase(**results)
    assert base.request_timeout == (4.0, 3.0)
    assert base.request_auth == ("user", "pass")
    assert base.request_url == "http://localhost:34/path/here"
    assert base.url().startswith("http://user:pass@localhost:34/path/here")

    results = URLBase.parse_url("http://user@127.0.0.1/path/")
    assert results.get("user") == "user"
    assert results.get("password") is None
    assert results.get("fullpath") == "/path/"
    assert results.get("secure") is False
    assert results.get("verify") is True
    base = URLBase(**results)
    assert base.request_timeout == (4.0, 4.0)
    assert base.request_auth == ("user", None)
    assert base.request_url == "http://127.0.0.1/path/"
    assert base.url().startswith("http://user@127.0.0.1/path/")

    # Generic initialization
    base = URLBase(**{"schema": ""})
    assert base.request_timeout == (4.0, 4.0)
    assert base.request_auth is None
    assert base.request_url == "http:///"
    assert base.url().startswith("http:///")

    base = URLBase()
    assert base.request_timeout == (4.0, 4.0)
    assert base.request_auth is None
    assert base.request_url == "http:///"
    assert base.url().startswith("http:///")


def test_apprise_unique_id():
    """
    API: Apprise() Input Formats tests

    """

    # Default testing
    obj1 = Apprise.instantiate("json://user@127.0.0.1/path")
    obj2 = Apprise.instantiate("json://user@127.0.0.1/path/?arg=")

    assert obj1.url_identifier == obj2.url_identifier
    assert obj1.url_id() == obj2.url_id()
    # Second call leverages lazy reference (so it's much faster
    assert obj1.url_id() == obj2.url_id()
    # Disable Lazy Setting
    assert obj1.url_id(lazy=False) == obj2.url_id(lazy=False)

    # A variation such as providing a password or altering the path makes the
    # url_id() different:
    obj2 = Apprise.instantiate("json://user@127.0.0.1/path2/?arg=")  # path
    assert obj1.url_id() != obj2.url_id()
    obj2 = Apprise.instantiate(
        "jsons://user@127.0.0.1/path/?arg="
    )  # secure flag
    assert obj1.url_id() != obj2.url_id()
    obj2 = Apprise.instantiate("json://user2@127.0.0.1/path/?arg=")  # user
    assert obj1.url_id() != obj2.url_id()
    obj2 = Apprise.instantiate("json://user@127.0.0.1:8080/path/?arg=")  # port
    assert obj1.url_id() != obj2.url_id()
    obj2 = Apprise.instantiate(
        "json://user:pass@127.0.0.1/path/?arg="
    )  # password
    assert obj1.url_id() != obj2.url_id()

    # Leverage salt setting
    asset = AppriseAsset(storage_salt="abcd")

    obj2 = Apprise.instantiate("json://user@127.0.0.1/path/", asset=asset)
    assert obj1.url_id(lazy=False) != obj2.url_id(lazy=False)

    asset = AppriseAsset(storage_salt=b"abcd")
    # same salt value produces a match again
    obj1 = Apprise.instantiate("json://user@127.0.0.1/path/", asset=asset)
    assert obj1.url_id() == obj2.url_id()

    # We'll add a good notification to our list
    class TesNoURLID(NotifyBase):
        """This class is just sets a use case where we don't return a
        url_identifier."""

        # we'll use this as a key to make our service easier to find
        # in the next part of the testing
        service_name = "nourl"

        _url_identifier = False

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

        @staticmethod
        def parse_url(url):
            return NotifyBase.parse_url(url, verify_host=False)

        @property
        def url_identifier(self):
            """No URL Identifier."""
            return self._url_identifier

    N_MGR["nourl"] = TesNoURLID

    # setting URL Identifier to False disables the generator
    url = "nourl://"
    obj = Apprise.instantiate(url)
    # No generation takes place
    assert obj.url_id() is None

    #
    # Dictionary Testing
    #
    obj._url_identifier = {
        "abc": "123",
        "def": b"\0",
        "hij": 42,
        "klm": object,
    }
    # call uses cached value (from above)
    assert obj.url_id() is None
    # Tests dictionary key generation
    assert obj.url_id(lazy=False) is not None

    # List/Set/Tuple Testing
    #
    obj1 = Apprise.instantiate(url)
    obj1._url_identifier = ["123", b"\0", 42, object]
    # Tests dictionary key generation
    assert obj1.url_id() is not None

    obj2 = Apprise.instantiate(url)
    obj2._url_identifier = ("123", b"\0", 42, object)
    assert obj2.url_id() is not None
    assert obj1.url_id() == obj2.url_id()

    obj3 = Apprise.instantiate(url)
    obj3._url_identifier = {"123", b"\0", 42, object}
    assert obj3.url_id() is not None

    obj = Apprise.instantiate(url)
    obj._url_identifier = b"test"
    assert obj.url_id() is not None

    obj = Apprise.instantiate(url)
    obj._url_identifier = "test"
    assert obj.url_id() is not None

    # Testing Garbage
    for x in (31, object, 43.1):
        obj = Apprise.instantiate(url)
        obj._url_identifier = x
        assert obj.url_id() is not None


def test_apprise_notify_formats():
    """
    API: Apprise() Input Formats tests

    """
    # Need to set async_mode=False to call notify() instead of async_notify().
    asset = AppriseAsset(async_mode=False)

    a = Apprise(asset=asset)

    # no items
    assert len(a) == 0

    class TextNotification(NotifyBase):
        # set our default notification format
        notify_format = NotifyFormat.TEXT

        def __init__(self, **kwargs):
            super().__init__(**kwargs)

        def notify(self, **kwargs):
            # Pretend everything is okay
            return True

    class HtmlNotification(NotifyBase):
        # set our default notification format
        notify_format = NotifyFormat.HTML

        def __init__(self, **kwargs):
            super().__init__(**kwargs)

        def notify(self, **kwargs):
            # Pretend everything is okay
            return True

    class MarkDownNotification(NotifyBase):
        # set our default notification format
        notify_format = NotifyFormat.MARKDOWN

        def __init__(self, **kwargs):
            super().__init__(**kwargs)

        def notify(self, **kwargs):
            # Pretend everything is okay
            return True

    # Store our notifications into our schema map
    N_MGR["text"] = TextNotification
    N_MGR["html"] = HtmlNotification
    N_MGR["markdown"] = MarkDownNotification

    # Test Markdown; the above calls the markdown because our good://
    # defined plugin above was defined to default to HTML which triggers
    # a markdown to take place if the body_format specified on the notify
    # call
    assert a.add("html://localhost") is True
    assert a.add("html://another.server") is True
    assert a.add("html://and.another") is True
    assert a.add("text://localhost") is True
    assert a.add("text://another.server") is True
    assert a.add("text://and.another") is True
    assert a.add("markdown://localhost") is True
    assert a.add("markdown://another.server") is True
    assert a.add("markdown://and.another") is True

    assert len(a) == 9

    assert (
        a.notify(
            title="markdown",
            body="## Testing Markdown",
            body_format=NotifyFormat.MARKDOWN,
        )
        is True
    )

    assert (
        a.notify(
            title="text", body="Testing Text", body_format=NotifyFormat.TEXT
        )
        is True
    )

    assert (
        a.notify(
            title="html", body="<b>HTML</b>", body_format=NotifyFormat.HTML
        )
        is True
    )


def test_apprise_asset(tmpdir):
    """
    API: AppriseAsset() object

    """
    a = AppriseAsset(theme="light")
    # Default theme
    assert a.theme == "light"

    # Invalid kw handling
    with pytest.raises(AttributeError):
        AppriseAsset(invalid_kw="value")

    a = AppriseAsset(
        theme="dark",
        image_path_mask="/{THEME}/{TYPE}-{XY}{EXTENSION}",
        image_url_mask="http://localhost/{THEME}/{TYPE}-{XY}{EXTENSION}",
    )

    a.default_html_color = "#abcabc"

    assert a.color("invalid", tuple) == (171, 202, 188)
    assert a.color(NotifyType.INFO, tuple) == (58, 163, 227)

    assert a.color("invalid", int) == 11258556
    assert a.color(NotifyType.INFO, int) == 3843043

    assert a.color("invalid", None) == "#abcabc"
    assert a.color(NotifyType.INFO, None) == "#3AA3E3"
    # None is the default
    assert a.color(NotifyType.INFO) == "#3AA3E3"

    # Invalid Type
    with pytest.raises(ValueError):
        # The exception we expect since dict is not supported
        a.color(NotifyType.INFO, dict)

    # Test our ASCII mappings
    assert a.ascii("invalid") == "[?]"
    assert a.ascii(NotifyType.INFO) == "[i]"
    assert a.ascii(NotifyType.SUCCESS) == "[+]"
    assert a.ascii(NotifyType.WARNING) == "[~]"
    assert a.ascii(NotifyType.FAILURE) == "[!]"

    # Invalid Type
    with pytest.raises(ValueError):
        # The exception we expect since dict is not supported
        a.color(NotifyType.INFO, dict)

    assert (
        a.image_url(NotifyType.INFO, NotifyImageSize.XY_128)
        == "http://localhost/dark/info-128x128.png"
    )

    # Default image size
    assert (
        a.image_url(NotifyType.INFO)
        == "http://localhost/dark/info-256x256.png"
    )

    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=False)
        == "/dark/info-256x256.png"
    )

    # This path doesn't exist so image_raw will fail (since we just
    # randompyl picked it for testing)
    assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None

    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=True)
        is None
    )

    # Create a new object (with our default settings)
    a = AppriseAsset()

    # Our default configuration can access our file
    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=True)
        is not None
    )

    assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None

    # Create a temporary directory
    sub = tmpdir.mkdir("great.theme")

    # Write a file
    sub.join(
        f"{NotifyType.INFO.value}-"
        f"{NotifyImageSize.XY_256.value}.png").write(
            "the content doesn't matter for testing."
    )

    # Create an asset that will reference our file we just created
    a = AppriseAsset(
        theme="great.theme",
        image_path_mask=(
            f"{dirname(sub.strpath)}/{{THEME}}/{{TYPE}}-{{XY}}.png"
        ),
    )

    # We'll be able to read file we just created
    assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None

    # We can retrieve the filename at this point even with must_exist set
    # to True
    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=True)
        is not None
    )

    # Test case where we can't access the image file
    with mock.patch("builtins.open", side_effect=OSError()):
        assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None

    # Our content is retrivable again
    assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is not None

    # Disable all image references
    a = AppriseAsset(image_path_mask=False, image_url_mask=False)

    # We always return none in these calls now
    assert a.image_raw(NotifyType.INFO, NotifyImageSize.XY_256) is None
    assert a.image_url(NotifyType.INFO, NotifyImageSize.XY_256) is None
    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=False)
        is None
    )
    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=True)
        is None
    )

    # Test our default extension out
    a = AppriseAsset(
        image_path_mask="/{THEME}/{TYPE}-{XY}{EXTENSION}",
        image_url_mask="http://localhost/{THEME}/{TYPE}-{XY}{EXTENSION}",
        default_extension=".jpeg",
    )
    assert (
        a.image_path(NotifyType.INFO, NotifyImageSize.XY_256, must_exist=False)
        == "/default/info-256x256.jpeg"
    )

    assert (
        a.image_url(NotifyType.INFO, NotifyImageSize.XY_256)
        == "http://localhost/default/info-256x256.jpeg"
    )

    # extension support
    assert (
        a.image_path(
            NotifyType.INFO,
            NotifyImageSize.XY_128,
            must_exist=False,
            extension=".ico",
        )
        == "/default/info-128x128.ico"
    )

    assert (
        a.image_url(NotifyType.INFO, NotifyImageSize.XY_256, extension=".test")
        == "http://localhost/default/info-256x256.test"
    )

    a = AppriseAsset(plugin_paths=("/tmp",))
    assert a.plugin_paths == ("/tmp",)


def test_apprise_disabled_plugins():
    """
    API: Apprise() Disabled Plugin States

    """
    # Ensure there are no other drives loaded
    N_MGR.unload_modules(disable_native=True)
    assert len(N_MGR) == 0

    class TestDisabled01Notification(NotifyBase):
        """This class is used to test a pre-disabled state."""

        # Just flat out disable our service
        enabled = False

        # we'll use this as a key to make our service easier to find
        # in the next part of the testing
        service_name = "na01"

        def notify(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["na01"] = TestDisabled01Notification

    class TestDisabled02Notification(NotifyBase):
        """This class is used to test a post-disabled state."""

        # we'll use this as a key to make our service easier to find
        # in the next part of the testing
        service_name = "na02"

        def __init__(self, *args, **kwargs):
            super().__init__(**kwargs)

            # enable state changes **AFTER** we initialize
            self.enabled = False

        def notify(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["na02"] = TestDisabled02Notification

    # Create our Apprise instance
    a = Apprise()

    result = a.details(lang="ca-en", show_disabled=True)
    assert isinstance(result, dict)
    assert "schemas" in result
    assert len(result["schemas"]) == 2

    # our na01 is disabled right from the get-go
    entry = next(
        (x for x in result["schemas"] if x["service_name"] == "na01"), None
    )
    assert entry is not None
    assert entry["enabled"] is False

    plugin = a.instantiate("na01://localhost")
    # Object is just flat out disabled... nothing is instatiated
    assert plugin is None

    # our na02 isn't however until it's initialized; as a result
    # it get's returned in our result set
    entry = next(
        (x for x in result["schemas"] if x["service_name"] == "na02"), None
    )
    assert entry is not None
    assert entry["enabled"] is True

    plugin = a.instantiate("na02://localhost")
    # Object isn't disabled until the __init__() call.  But this is still
    # enough to not instantiate the object:
    assert plugin is None

    # If we choose to filter our disabled, we can't unfortunately filter those
    # that go disabled after instantiation, but we do filter out any that are
    # already known to not be enabled:
    result = a.details(lang="ca-en", show_disabled=False)
    assert isinstance(result, dict)
    assert "schemas" in result
    assert len(result["schemas"]) == 1

    # We'll add a good notification to our list
    class TesEnabled01Notification(NotifyBase):
        """This class is just a simple enabled one."""

        # we'll use this as a key to make our service easier to find
        # in the next part of the testing
        service_name = "good"

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["good"] = TesEnabled01Notification

    # The last thing we'll simulate is a case where the plugin is just
    # disabled at a later time long into it's life.  this is just to allow
    # administrators to stop the flow of their notifications for their own
    # given reasons.
    plugin = a.instantiate("good://localhost")
    assert isinstance(plugin, NotifyBase)

    # we'll toggle our state
    plugin.enabled = False

    # As a result, we can now no longer send a notification:
    assert plugin.notify("My Message") is False

    # As just a proof of how you can toggle the state back:
    plugin.enabled = True

    # our notifications will go okay now
    assert plugin.notify("My Message") is True

    # Restore our modules
    N_MGR.unload_modules()


def test_apprise_details():
    """
    API: Apprise() Details

    """

    # This is a made up class that is just used to verify
    class TestDetailNotification(NotifyBase):
        """This class is used to test various configurations supported."""

        # Minimum requirements for a plugin to produce details
        service_name = "Detail Testing"

        # The default simple (insecure) protocol (used by NotifyMail)
        protocol = "details"

        # Set test_bool flag
        always_true = True
        always_false = False

        # Define object templates
        templates = (
            "{schema}://{host}",
            "{schema}://{host}:{port}",
            "{schema}://{user}@{host}:{port}",
            "{schema}://{user}:{pass}@{host}:{port}",
        )

        # Define our tokens; these are the minimum tokens required required to
        # be passed into this function (as arguments). The syntax appends any
        # previously defined in the base package and builds onto them
        template_tokens = dict(
            NotifyBase.template_tokens,
            **{
                "notype": {
                    # name is a minimum requirement
                    "name": _("no type"),
                },
                "regex_test01": {
                    "name": _("RegexTest"),
                    "type": "string",
                    "regex": r"^[A-Z0-9]$",
                },
                "regex_test02": {
                    "name": _("RegexTest"),
                    # Support regex options too
                    "regex": (r"^[A-Z0-9]$", "i"),
                },
                "regex_test03": {
                    "name": _("RegexTest"),
                    # Support regex option without a second option
                    "regex": r"^[A-Z0-9]$",
                },
                "regex_test04": {
                    # this entry would just end up getting removed
                    "regex": None,
                },
                # List without delimiters (causes defaults to kick in)
                "mylistA": {
                    "name": "fruit",
                    "type": "list:string",
                },
                # A list with a delimiter list
                "mylistB": {
                    "name": "softdrinks",
                    "type": "list:string",
                    "delim": ["|", "-"],
                },
            },
        )

        template_args = dict(
            NotifyBase.template_args,
            **{
                # Test _exist_if logic
                "test_exists_if_01": {
                    "name": "Always False",
                    "type": "bool",
                    # Provide a default
                    "default": False,
                    # Base the existance of this key/value entry on the lookup
                    # of this class value at runtime. Hence:
                    #     if not NotifyObject.always_false
                    #         del this_entry
                    #
                    "_exists_if": "always_false",
                },
                # Test _exist_if logic
                "test_exists_if_02": {
                    "name": "Always True",
                    "type": "bool",
                    # Provide a default
                    "default": False,
                    # Base the existance of this key/value entry on the lookup
                    # of this class value at runtime. Hence:
                    #     if not NotifyObject.always_true
                    #         del this_entry
                    #
                    "_exists_if": "always_true",
                },
                # alias_of testing
                "test_alias_of": {"alias_of": "mylistB", "delim": ("-", " ")},
            },
        )

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    # Store our good detail notification in our schema map
    N_MGR["details"] = TestDetailNotification

    # This is a made up class that is just used to verify
    class TestReq01Notification(NotifyBase):
        """This class is used to test various requirement configurations."""

        # Set some requirements
        requirements: ClassVar[RequirementsSpec] = {
            "packages_required": [
                "cryptography <= 3.4",
                "ultrasync",
            ],
            "packages_recommended": "django",
        }

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["req01"] = TestReq01Notification

    # This is a made up class that is just used to verify
    class TestReq02Notification(NotifyBase):
        """This class is used to test various requirement configurations."""

        # Just not enabled at all
        enabled = False

        # Set some requirements
        requirements: ClassVar[RequirementsSpec] = {
            # None and/or [] is implied, but jsut to show that the code won't
            # crash if explicitly set this way:
            "packages_required": None,
            "packages_recommended": [
                "cryptography <= 3.4",
            ],
        }

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["req02"] = TestReq02Notification

    # This is a made up class that is just used to verify
    class TestReq03Notification(NotifyBase):
        """This class is used to test various requirement configurations."""

        # Set some requirements
        requirements: ClassVar[RequirementsSpec] = {
            # We can over-ride the default details assigned to our plugin if
            # specified
            "details": _("some specified requirement details"),
            # We can set a string value as well (it does not have to be a list)
            "packages_recommended": "cryptography <= 3.4",
        }

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["req03"] = TestReq03Notification

    # This is a made up class that is just used to verify
    class TestReq04Notification(NotifyBase):
        """This class is used to test a case where our requirements is fixed to
        a None."""

        # This is the same as saying there are no requirements
        requirements = None

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["req04"] = TestReq04Notification

    # This is a made up class that is just used to verify
    class TestReq05Notification(NotifyBase):
        """This class is used to test a case where only packages_recommended is
        identified."""

        requirements: ClassVar[RequirementsSpec] = {
            # We can set a string value as well (it does not have to be a list)
            "packages_recommended": "cryptography <= 3.4"
        }

        def send(self, **kwargs):
            # Pretend everything is okay (so we don't break other tests)
            return True

    N_MGR["req05"] = TestReq05Notification

    # Create our Apprise instance
    a = Apprise()

    # Dictionary response
    result = a.details()
    assert isinstance(result, dict)

    # Test different variations of our call
    result = a.details(lang="ca-fr")
    assert isinstance(result, dict)
    for entry in result["schemas"]:
        # Verify our key does not exist because we did not ask for it
        assert "enabled" not in entry
        assert "requirements" not in entry

    result = a.details(lang="us-en", show_requirements=True)
    assert isinstance(result, dict)
    for entry in result["schemas"]:
        # Verify our key does not exist because we did not ask for it
        assert "enabled" not in entry

        # Requirements are set for display
        assert "requirements" in entry
        assert "details" in entry["requirements"]
        assert "packages_required" in entry["requirements"]
        assert "packages_recommended" in entry["requirements"]
        assert isinstance(
            entry["requirements"]["details"], (str, LazyTranslation)
        )
        assert isinstance(entry["requirements"]["packages_required"], list)
        assert isinstance(entry["requirements"]["packages_recommended"], list)

    result = a.details(lang="ca-en", show_disabled=True)
    assert isinstance(result, dict)
    for entry in result["schemas"]:
        # Verify that our plugin state is available to us
        assert "enabled" in entry
        assert isinstance(entry["enabled"], bool)

        # Verify our key does not exist because we did not ask for it
        assert "requirements" not in entry

    result = a.details(
        lang="ca-fr", show_requirements=True, show_disabled=True
    )
    assert isinstance(result, dict)
    for entry in result["schemas"]:
        # Plugin States are set for display
        assert "enabled" in entry
        assert isinstance(entry["enabled"], bool)

        # Requirements are set for display
        assert "requirements" in entry
        assert "details" in entry["requirements"]
        assert "packages_required" in entry["requirements"]
        assert "packages_recommended" in entry["requirements"]
        assert isinstance(
            entry["requirements"]["details"], (str, LazyTranslation)
        )
        assert isinstance(entry["requirements"]["packages_required"], list)
        assert isinstance(entry["requirements"]["packages_recommended"], list)


def test_apprise_details_plugin_verification():
    """
    API: Apprise() Details Plugin Verification

    """
    # Prepare our object
    a = Apprise()

    # Details object
    details = a.details()

    # Dictionary response
    assert isinstance(details, dict)

    # Details object with language defined:
    details = a.details(lang="en")

    # Dictionary response
    assert isinstance(details, dict)

    # Details object with unsupported language:
    details = a.details(lang="xx")

    # Dictionary response
    assert isinstance(details, dict)

    # Apprise version
    assert "version" in details
    assert details.get("version") == __version__

    # Defined schemas identify each plugin
    assert "schemas" in details
    assert isinstance(details.get("schemas"), list)

    # We have an entry per defined plugin
    assert "asset" in details
    assert isinstance(details.get("asset"), dict)
    assert "app_id" in details["asset"]
    assert "app_desc" in details["asset"]
    assert "default_extension" in details["asset"]
    assert "theme" in details["asset"]
    assert "image_path_mask" in details["asset"]
    assert "image_url_mask" in details["asset"]
    assert "image_url_logo" in details["asset"]

    # Valid Type Regular Expression Checker
    # Case Sensitive and MUST match the following:
    is_valid_type_re = re.compile(r"((choice|list):)?(string|bool|int|float)")

    # match tokens found in templates so we can cross reference them back
    # to see if they have a matching argument
    template_token_re = re.compile(r"{([^}]+)}[^{]*?(?=$|{)")

    # Define acceptable map_to arguments that can be tied in with the
    # kwargs function definitions.
    valid_kwargs = {
        # General Parameters
        "user",
        "password",
        "port",
        "host",
        "schema",
        "fullpath",
        # NotifyBase parameters:
        "tz",
        "format",
        "overflow",
        "emojis",
        # URLBase parameters:
        "verify",
        "cto",
        "rto",
        "store",
    }

    # Valid Schema Entries:
    valid_schema_keys = (
        "name",
        "private",
        "required",
        "type",
        "values",
        "min",
        "max",
        "regex",
        "default",
        "list",
        "delim",
        "prefix",
        "map_to",
        "alias_of",
        "group",
    )
    for entry in details["schemas"]:

        # Track the map_to entries (if specified); We need to make sure that
        # these properly map back
        map_to_entries = set()

        # Track the alias_of entries
        map_to_aliases = set()

        # A Service Name MUST be defined
        assert "service_name" in entry
        assert isinstance(entry["service_name"], (str, LazyTranslation))

        # Acquire our protocols
        protocols = parse_list(entry["protocols"], entry["secure_protocols"])

        # At least one schema/protocol MUST be defined
        assert len(protocols) > 0

        # our details
        assert "details" in entry
        assert isinstance(entry["details"], dict)

        # All schema details should include args
        for section in ["kwargs", "args", "tokens"]:
            assert section in entry["details"]
            assert isinstance(entry["details"][section], dict)

            for key, arg in entry["details"][section].items():
                # Validate keys (case-sensitive)
                assert len([k for k in arg if k not in valid_schema_keys]) == 0

                # Test our argument
                assert isinstance(arg, dict)

                if "alias_of" not in arg:
                    # Minimum requirement of an argument
                    assert "name" in arg
                    assert isinstance(arg["name"], str)

                    assert "type" in arg
                    assert isinstance(arg["type"], str)
                    assert is_valid_type_re.match(arg["type"]) is not None

                    if "min" in arg:
                        assert arg["type"].endswith("float") or arg[
                            "type"
                        ].endswith("int")
                        assert isinstance(arg["min"], (int, float))

                        if "max" in arg:
                            # If a min and max was specified, at least check
                            # to confirm the min is less then the max
                            assert arg["min"] < arg["max"]

                    if "max" in arg:
                        assert arg["type"].endswith("float") or arg[
                            "type"
                        ].endswith("int")
                        assert isinstance(arg["max"], (int, float))

                    if "private" in arg:
                        assert isinstance(arg["private"], bool)

                    if "required" in arg:
                        assert isinstance(arg["required"], bool)

                    if "prefix" in arg:
                        assert isinstance(arg["prefix"], str)
                        if section == "kwargs":
                            # The only acceptable prefix types for kwargs
                            assert arg["prefix"] in (":", "+", "-")

                    else:
                        # kwargs requires that the 'prefix' is defined
                        assert section != "kwargs"

                    if "map_to" in arg:
                        # must be a string
                        assert isinstance(arg["map_to"], str)
                        # Track our map_to object
                        map_to_entries.add(arg["map_to"])

                    else:
                        map_to_entries.add(key)

                    # Some verification
                    if arg["type"].startswith("choice"):

                        # choice:bool is redundant and should be swapped to
                        # just bool
                        assert not arg["type"].endswith("bool")

                        # Choices require that a values list is provided
                        assert "values" in arg
                        assert isinstance(
                            arg["values"], (list, tuple, frozenset, set))
                        assert len(arg["values"]) > 0

                        # Test default
                        if "default" in arg:
                            # if a default is provided on a choice object,
                            # it better be in the list of values
                            assert arg["default"] in arg["values"]

                    if arg["type"].startswith("bool"):
                        # Boolean choices are less restrictive but require a
                        # default value
                        assert "default" in arg
                        assert isinstance(arg["default"], bool)

                    if "regex" in arg:
                        # Regex must ALWAYS be in the format (regex, option)
                        assert isinstance(arg["regex"], (tuple, list))
                        assert len(arg["regex"]) == 2
                        assert isinstance(arg["regex"][0], str)
                        assert arg["regex"][1] is None or isinstance(
                            arg["regex"][1], str
                        )

                        # Compile the regular expression to verify that it is
                        # valid
                        try:
                            re.compile(arg["regex"][0])

                        except:
                            assert "{} is an invalid regex".format(
                                arg["regex"][0]
                            )

                        # Regex should always start and/or end with ^/$
                        assert (
                            re.match(r"^\^.+?$", arg["regex"][0]) is not None
                        )
                        assert (
                            re.match(r"^.+?\$$", arg["regex"][0]) is not None
                        )

                    if arg["type"].startswith("list"):
                        # Delimiters MUST be defined
                        assert "delim" in arg
                        assert isinstance(arg["delim"], (list, tuple))
                        assert len(arg["delim"]) > 0

                else:  # alias_of is in the object
                    # Ensure we're not already in the tokens section
                    # The alias_of object has no value here
                    assert section != "tokens"

                    # must be a string
                    assert isinstance(arg["alias_of"], (str, list, tuple, set))

                    aliases = (
                        [arg["alias_of"]]
                        if isinstance(arg["alias_of"], str)
                        else arg["alias_of"]
                    )

                    for alias_of in aliases:
                        # Track our alias_of object
                        map_to_aliases.add(alias_of)

                        # We can't be an alias_of ourselves
                        if key == alias_of:
                            # This is acceptable as long as we exist in the
                            # tokens table because that is truely what we map
                            # back to
                            assert key in entry["details"]["tokens"]

                        else:
                            # Throw the problem into an assert tag for
                            # debugging purposes... the mapping is not
                            # acceptable
                            assert key != alias_of

                        # alias_of always references back to tokens
                        assert (
                            alias_of in entry["details"]["tokens"]
                            or alias_of in entry["details"]["args"]
                        )

                        # Find a list directive in our tokens
                        t_match = (
                            entry["details"]["tokens"]
                            .get(alias_of, {})
                            .get("type", "")
                            .startswith("list")
                        )

                        a_match = (
                            entry["details"]["args"]
                            .get(alias_of, {})
                            .get("type", "")
                            .startswith("list")
                        )

                        if not (t_match or a_match):
                            # Ensure the only token we have is the alias_of
                            # hence record should look like as example):
                            # {
                            #    'token': {
                            #      'alias_of': 'apitoken',
                            #    },
                            # }
                            #
                            # Or if it can represent more then one entry; in
                            # this case, one must define a name (to define
                            # grouping).
                            # {
                            #    'token': {
                            #      'name': 'Tokens',
                            #      'alias_of': ('apitoken', 'webtoken'),
                            #    },
                            # }
                            if isinstance(arg["alias_of"], str):
                                assert len(entry["details"][section][key]) == 1
                            else:  # is tuple,list, or set
                                assert len(entry["details"][section][key]) == 2
                                # Must have a name defined to define grouping
                                assert "name" in entry["details"][section][key]

                        else:
                            # We're a list, we allow up to 2 variables
                            # Obviously we have the alias_of entry; that's why
                            # were at this part of the code.  But we can
                            # additionally provide a 'delim' over-ride.
                            assert len(entry["details"][section][key]) <= 2
                            if len(entry["details"][section][key]) == 2:
                                # Verify that it is in fact the 'delim' tag
                                assert (
                                    "delim" in entry["details"][section][key]
                                )
                                # If we do have a delim value set, it must be
                                # of a list/set/tuple type
                                assert isinstance(
                                    entry["details"][section][key]["delim"],
                                    (tuple, set, list),
                                )

        spec = inspect.getfullargspec(N_MGR._schema_map[protocols[0]].__init__)

        function_args = (
            (set(parse_list(spec.varkw)) - {"kwargs"})
            | (set(spec.args) - {"self"})
            | valid_kwargs
        )

        # Iterate over our map_to_entries and make sure that everything
        # maps to a function argument
        for arg in map_to_entries:
            if arg not in function_args:
                # This print statement just makes the error easier to
                # troubleshoot
                raise AssertionError(
                    "{}.__init__() expects a {}=None entry according to "
                    "template configuration".format(
                        N_MGR._schema_map[protocols[0]].__name__, arg
                    )
                )

        # Iterate over all of the function arguments and make sure that
        # it maps back to a key
        function_args -= valid_kwargs
        for arg in function_args:
            if arg not in map_to_entries:
                err = N_MGR._schema_map[protocols[0]].__name__.__init__(arg)
                raise AssertionError(
                    f"{err} found but not defined in the template "
                    "configuration"
                )

        # Iterate over our map_to_aliases and make sure they were defined in
        # either the as a token or arg
        for arg in map_to_aliases:
            assert arg in set(entry["details"]["args"].keys()) | set(
                entry["details"]["tokens"].keys()
            )

        # Template verification
        assert "templates" in entry["details"]
        assert isinstance(entry["details"]["templates"], (set, tuple, list))

        # Iterate over our templates and parse our arguments
        for template in entry["details"]["templates"]:
            # Ensure we've properly opened and closed all of our tokens
            assert template.count("{") == template.count("}")

            expected_tokens = template.count("}")
            args = template_token_re.findall(template)
            assert expected_tokens == len(args)

            # Build a cross reference set of our current defined objects
            defined_tokens = set()
            for key, arg in entry["details"]["tokens"].items():
                defined_tokens.add(key)
                if "alias_of" in arg:
                    defined_tokens.add(arg["alias_of"])

            # We want to make sure all of our defined tokens have been
            # accounted for in at least one defined template
            for arg in args:
                assert arg in set(entry["details"]["args"].keys()) | set(
                    entry["details"]["tokens"].keys()
                )

                # The reverse of the above; make sure that each entry defined
                # in the template_tokens is accounted for in at least one of
                # the defined templates
                assert arg in defined_tokens


@mock.patch("requests.post")
@mock.patch("asyncio.gather", wraps=asyncio.gather)
@mock.patch(
    "concurrent.futures.ThreadPoolExecutor",
    wraps=concurrent.futures.ThreadPoolExecutor,
)
def test_apprise_async_mode(mock_threadpool, mock_gather, mock_post):
    """
    API: Apprise() async_mode tests

    """
    mock_post.return_value.status_code = requests.codes.ok

    # Define some servers
    servers = [
        "xml://localhost",
        "json://localhost",
    ]

    # Default Async Mode is to be enabled
    asset = AppriseAsset()
    assert asset.async_mode is True

    # Load our asset
    a = Apprise(asset=asset)

    # add our servers
    a.add(servers=servers)

    # 2 servers loaded
    assert len(a) == 2

    # Our servers should carry this flag
    for server in a:
        assert server.asset.async_mode is True

    # Send Notifications Asyncronously
    assert a.notify("async") is True

    # Verify our thread pool was created
    assert mock_threadpool.call_count == 1
    mock_threadpool.reset_mock()

    # Provide an over-ride now
    asset = AppriseAsset(async_mode=False)
    assert asset.async_mode is False

    # Load our asset
    a = Apprise(asset=asset)

    # Verify our configuration kept
    assert a.asset.async_mode is False

    # add our servers
    a.add(servers=servers)

    # 2 servers loaded
    assert len(a) == 2

    # Our servers should carry this flag
    for server in a:
        assert server.asset.async_mode is False

    # Send Notifications Syncronously
    assert a.notify("sync") is True
    # Sequential send doesn't require a gather
    assert mock_gather.call_count == 0
    mock_gather.reset_mock()

    # another way of looking a our false set asset configuration
    assert a[0].asset.async_mode is False
    assert a[1].asset.async_mode is False

    # Adjust 1 of the servers async_mode settings
    a[0].asset.async_mode = True
    assert a[0].asset.async_mode is True

    # They all share the same object, so this gets toggled too
    assert a[1].asset.async_mode is True

    # We'll just change this one
    a[1].asset = AppriseAsset(async_mode=False)
    assert a[0].asset.async_mode is True
    assert a[1].asset.async_mode is False

    # Send 1 Notification Syncronously, the other Asyncronously
    assert a.notify("a mixed batch") is True

    # Verify we didn't use a thread pool for a single notification
    assert mock_threadpool.call_count == 0
    mock_threadpool.reset_mock()


def test_notify_matrix_dynamic_importing(tmpdir):
    """
    API: Apprise() Notify Matrix Importing

    """

    # Make our new path valid
    suite = tmpdir.mkdir("apprise_notify_test_suite")
    suite.join("__init__.py").write("")

    module_name = "badnotify"

    # Update our path to point to our new test suite
    sys.path.insert(0, str(suite))

    # Create a base area to work within
    base = suite.mkdir(module_name)
    base.join("__init__.py").write("")

    # Test no app_id
    base.join("NotifyBadFile1.py").write(cleandoc("""
        class NotifyBadFile1:
            pass
        """))

    # No class of the same name
    base.join("NotifyBadFile2.py").write(cleandoc("""
        class BadClassName:
            pass
        """))

    # Exception thrown
    base.join("NotifyBadFile3.py").write("""raise ImportError()""")

    # Utilizes a schema:// already occupied (as string)
    base.join("NotifyGoober.py").write(cleandoc("""
        from apprise import NotifyBase
        class NotifyGoober(NotifyBase):
            # This class tests the fact we have a new class name, but we're
            # trying to over-ride items previously used

            # The default simple (insecure) protocol (used by NotifyMail)
            protocol = ('mailto', 'goober')

            # The default secure protocol (used by NotifyMail)
            secure_protocol = 'mailtos'

            @staticmethod
            def parse_url(url, *args, **kwargs):
                # always parseable
                return ConfigBase.parse_url(url, verify_host=False)
        """))

    # Utilizes a schema:// already occupied (as tuple)
    base.join("NotifyBugger.py").write(cleandoc("""
        from apprise import NotifyBase
        class NotifyBugger(NotifyBase):
            # This class tests the fact we have a new class name, but we're
            # trying to over-ride items previously used

            # The default simple (insecure) protocol (used by NotifyMail), the
            # other isn't
            protocol = ('mailto', 'bugger-test' )

            # The default secure protocol (used by NotifyMail), the other isn't
            secure_protocol = ('mailtos', ['garbage'])

            @staticmethod
            def parse_url(url, *args, **kwargs):
                # always parseable
                return ConfigBase.parse_url(url, verify_host=False)
            """))

    N_MGR.load_modules(path=str(base), name=module_name)
